/*
 * Unidata Platform Community Edition
 * Copyright (c) 2013-2020, UNIDATA LLC, All rights reserved.
 * This file is part of the Unidata Platform Community Edition software.
 *
 * Unidata Platform Community Edition is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * Unidata Platform Community Edition is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program. If not, see <https://www.gnu.org/licenses/>.
 */

package org.unidata.mdm.data.service.segments.records.upsert;

import static java.util.Objects.isNull;
import static java.util.Objects.nonNull;

import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.Objects;
import java.util.UUID;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.unidata.mdm.core.service.MetaModelService;
import org.unidata.mdm.core.type.calculables.CalculableHolder;
import org.unidata.mdm.core.type.data.Attribute;
import org.unidata.mdm.core.type.data.DataRecord;
import org.unidata.mdm.core.type.data.RecordStatus;
import org.unidata.mdm.core.type.model.EntityElement;
import org.unidata.mdm.core.type.model.SourceSystemElement;
import org.unidata.mdm.core.type.timeline.TimeInterval;
import org.unidata.mdm.core.type.timeline.Timeline;
import org.unidata.mdm.core.util.SecurityUtils;
import org.unidata.mdm.data.context.GetRecordTimelineRequestContext;
import org.unidata.mdm.data.context.RecordIdentityContextSupport;
import org.unidata.mdm.data.context.UpsertRequestContext;
import org.unidata.mdm.data.dto.UpsertRecordDTO;
import org.unidata.mdm.data.exception.DataExceptionIds;
import org.unidata.mdm.data.exception.DataProcessingException;
import org.unidata.mdm.data.module.DataModule;
import org.unidata.mdm.data.po.data.RecordEtalonPO;
import org.unidata.mdm.data.po.data.RecordOriginPO;
import org.unidata.mdm.data.po.keys.RecordExternalKeysPO;
import org.unidata.mdm.data.service.impl.CommonRecordsComponent;
import org.unidata.mdm.data.service.segments.AttributesAutogenerationSupport;
import org.unidata.mdm.data.service.segments.ExternalIdAutogenerationSupport;
import org.unidata.mdm.data.type.apply.RecordUpsertChangeSet;
import org.unidata.mdm.data.type.data.OriginRecord;
import org.unidata.mdm.data.type.data.UpsertAction;
import org.unidata.mdm.data.type.keys.RecordEtalonKey;
import org.unidata.mdm.data.type.keys.RecordKeys;
import org.unidata.mdm.data.type.keys.RecordOriginKey;
import org.unidata.mdm.data.type.timeline.RecordTimeline;
import org.unidata.mdm.data.util.RecordFactoryUtils;
import org.unidata.mdm.data.util.StorageUtils;
import org.unidata.mdm.meta.configuration.Descriptors;
import org.unidata.mdm.system.type.pipeline.Start;
import org.unidata.mdm.system.type.runtime.MeasurementPoint;
import org.unidata.mdm.system.util.IdUtils;

/**
 * @author Mikhail Mikhailov
 *         Old 'ensure before' part of the ORC.
 */
@Component(RecordUpsertStartExecutor.SEGMENT_ID)
public class RecordUpsertStartExecutor
    extends Start<UpsertRequestContext, UpsertRecordDTO>
    implements
        RecordIdentityContextSupport,
        AttributesAutogenerationSupport,
        ExternalIdAutogenerationSupport {
    /**
     * This logger.
     */
    private static final Logger LOGGER = LoggerFactory.getLogger(RecordUpsertStartExecutor.class);
    /**
     * This segment ID.
     */
    public static final String SEGMENT_ID = DataModule.MODULE_ID + "[RECORD_UPSERT_START]";
    /**
     * Localized message code.
     */
    public static final String SEGMENT_DESCRIPTION = DataModule.MODULE_ID + ".record.upsert.start.description";
    /**
     * Common functionality.
     */
    @Autowired
    private CommonRecordsComponent commonRecordsComponent;
    /**
     * Meta model service.
     */
    @Autowired
    private MetaModelService metaModelService;
    /**
     * Constructor.
     */
    public RecordUpsertStartExecutor() {
        super(SEGMENT_ID, SEGMENT_DESCRIPTION, UpsertRequestContext.class, UpsertRecordDTO.class);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public void start(UpsertRequestContext ctx) {
        MeasurementPoint.start();
        try {
            setup(ctx);
        } finally {
            MeasurementPoint.stop();
        }
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public String subject(UpsertRequestContext ctx) {
        MeasurementPoint.start();
        try {
            setup(ctx);
            RecordKeys keys = ctx.keys();
            return keys.getEntityName();
        } finally {
            MeasurementPoint.stop();
        }
    }

    protected void setup(UpsertRequestContext ctx) {

        if (ctx.setUp()) {
            return;
        }

        // 0. Run autogeneration. Entity name must exist on the context or batch insert keys for this to work.
        setupAutogeneration(ctx);

        // 1. Prepare change set
        setupFieldsBeforeKeys(ctx);

        // 2. In cases, other then batch insert try to load current timeline
        setupTimeline(ctx);

        // 3. Run verify
        setupVerify(ctx);

        // 4. Pre-process, fetch identity and TL, verify
        setupKeys(ctx);

        // 5. Set up remaining fields
        setupFieldsAfterKeys(ctx);

        ctx.setUp(true);
    }

    protected void setupAutogeneration(UpsertRequestContext ctx) {

        // 0. Stop on valid keys
        if (ctx.isValidRecordKey()) {
            return;
        }

        // 1. Select model element
        String entityName = selectEntityName(ctx);
        if (Objects.isNull(entityName)) {
            return;
        }

        EntityElement info = metaModelService.instance(Descriptors.DATA).getElement(entityName);

        // 2. Check code attributes generation
        setupAttributesAutogeneration(info, ctx);

        // 3. Check for "ext id autogeneration" rules turned on and apply them
        setupExternalIdAutogeneration(info, ctx);
    }

    protected void setupFieldsBeforeKeys(UpsertRequestContext ctx) {

        // 1. Change set
        if (Objects.isNull(ctx.changeSet())) {
            ctx.changeSet(new RecordUpsertChangeSet());
        }

        // 2. Set record timestamp
        Date ts = ctx.localTimestamp();
        if (ts == null) {
            ts = new Date(System.currentTimeMillis());
        }

        ctx.timestamp(ts);
    }
    protected void setupTimeline(UpsertRequestContext uCtx) {

        MeasurementPoint.start();
        try {

            RecordKeys keys = uCtx.keys();
            boolean batchInsert = uCtx.isBatchOperation()
                    && Objects.nonNull(keys)
                    && keys.getOriginKey() != null
                    && keys.getOriginKey().getRevision() == 0;

            // 1. In cases, other then batch insert try to load current timeline
            if (!batchInsert) {

                GetRecordTimelineRequestContext tlCtx = GetRecordTimelineRequestContext.builder(uCtx)
                        .fetchData(true)
                        .parentDraftId(uCtx.getParentDraftId())
                        .draftId(uCtx.getDraftId())
                        .build();

                tlCtx.keys(keys);

                Timeline<OriginRecord> current = commonRecordsComponent.loadTimeline(tlCtx);

                uCtx.currentTimeline(current);
                uCtx.keys(current.getKeys());

            } else {
                Timeline<OriginRecord> current = new RecordTimeline(keys);
                uCtx.currentTimeline(current);
            }

        } finally {
            MeasurementPoint.stop();
        }
    }
    protected void setupVerify(UpsertRequestContext ctx) {

        // 1. Check input (presence of records themselves)
        if (!ctx.isRecalculateTimeline() && !ctx.isEtalon() && !ctx.isOrigin()) {
            final String message = "Invalid upsert request context. Either data or or keys are invalid / missing. Upsert rejected.";
            LOGGER.warn(message, ctx);
            throw new DataProcessingException(message, DataExceptionIds.EX_DATA_UPSERT_NO_INPUT, ctx);
        }

        RecordKeys keys = ctx.keys();

        // 2. Check supplied keys validity.
        if (keys == null && ((ctx.isOrigin() && ctx.isOriginRecordKey()) || ctx.isEtalonRecordKey())) {
            final String message = "Record can not be identified by supplied keys. Upsert rejected.";
            LOGGER.warn(message, ctx);
            throw new DataProcessingException(message, DataExceptionIds.EX_DATA_UPSERT_INVALID_KEYS);
        }

        // 3. Origin is inactive, discard updates
        if (keys != null && keys.getOriginKey() != null && keys.getOriginKey().getStatus() == RecordStatus.INACTIVE && !ctx.isRecalculateTimeline()) {
            final String message = "Origin [Ext. ID: {}, Source system: {}, Entity name: {}] is inactive. Upsert rejected.";
            LOGGER.warn(message, ctx);
            throw new DataProcessingException(message, DataExceptionIds.EX_DATA_UPSERT_ORIGIN_INACTIVE,
                    keys.getOriginKey().getExternalId(),
                    keys.getOriginKey().getSourceSystem(),
                    keys.getOriginKey().getEntityName());
        } else if (keys != null && keys.getEtalonKey() != null && keys.getEtalonKey().getStatus() == RecordStatus.INACTIVE && !ctx.isRecalculateTimeline()) {
            final String message = "Etalon [ID: {}] is inactive. Upsert rejected.";
            LOGGER.warn(message, ctx);
            throw new DataProcessingException(message, DataExceptionIds.EX_DATA_UPSERT_ETALON_INACTIVE, keys.getEtalonKey().getId());
        }

        // 4. Check key combination validity
        if (ctx.isOrigin() && !ctx.isOriginExternalId() && !ctx.isOriginRecordKey()) {
            final String message = "Cannot upsert origin record. Neither valid external id nor origin record key has been supplied. Upsert rejected.";
            LOGGER.warn(message, ctx);
            throw new DataProcessingException(message, DataExceptionIds.EX_DATA_UPSERT_INVALID_ORIGIN_INPUT);
        }

        // 5. Check source system.
        if (ctx.isOrigin() && ctx.isOriginExternalId() && metaModelService.instance(Descriptors.SOURCE_SYSTEMS).getSourceSystem(ctx.getSourceSystem()) == null) {
            String message = "Valid source system should be defined.";
            throw new DataProcessingException(message, DataExceptionIds.EX_DATA_UPSERT_NO_SOURCE_SYSTEM);
        }
    }
    /**
     * Prepare origin upsert between.
     * Creates O/E records if necessary and resets keys in the context.
     * @param ctx the context to prepare
     */
    protected void setupKeys(UpsertRequestContext ctx) {

        MeasurementPoint.start();
        try {

            // 3. Process keys actually
            RecordKeys keys = ctx.keys();

            boolean hasEtalonRecord = keys != null && keys.getEtalonKey() != null && keys.getEtalonKey().getId() != null;
            boolean hasOriginRecord = keys != null && keys.getOriginKey() != null && keys.getOriginKey().getId() != null;

            if (hasEtalonRecord && hasOriginRecord) {
                return;
            }

            RecordUpsertChangeSet changeSet = ctx.changeSet();

            // 3.1. Create etalon
            if (!hasEtalonRecord) {
                keys = generateWithEtalonKey(ctx, changeSet);
            }

            // 3.2. Create origin
            if (!hasOriginRecord) {
                keys = updateWithOriginKey(ctx, changeSet, keys, hasEtalonRecord);
            }

            // 3.3. New records initialized to empty timeline, holding no keys
            // This causes problems. Update empty timeline with new keys.
            if (keys.isNew()) {
                Timeline<OriginRecord> t = ctx.currentTimeline();
                t.setKeys(keys);
            }

            ctx.keys(keys);

        } finally {
            MeasurementPoint.stop();
        }
    }

    protected void setupFieldsAfterKeys(UpsertRequestContext ctx) {

        RecordKeys keys = ctx.keys();
        Timeline<OriginRecord> current = ctx.currentTimeline();

        // 1. Set action type
        UpsertAction action = keys.isNew() ? UpsertAction.INSERT : UpsertAction.UPDATE;
        ctx.upsertAction(action);

        // 2. Check for mergeWithPreviousVersion flag and re-construct the record if possible
        if (action == UpsertAction.UPDATE && ctx.isMergeWithPreviousVersion()) {
            setupMergeWithPreviousVersion(ctx, keys, current);
        }
    }
    /**
     * Loads previous version of the data and merges the input into it.
     *
     * @param ctx current context
     * @param current current timeline
     * @return merged record
     */
    protected void setupMergeWithPreviousVersion(UpsertRequestContext ctx, RecordKeys keys, Timeline<OriginRecord> current) {

        if (Objects.isNull(keys) || Objects.isNull(keys.getOriginKey())) {
            return;
        }

        Date recordFrom = ctx.getValidFrom();
        Date recordTo = ctx.getValidTo();
        OriginRecord prevOrigin = null;
        TimeInterval<OriginRecord> selected = current.selectAsOf(nonNull(recordFrom) ? recordFrom : recordTo);
        if (Objects.nonNull(selected)) {
            for (CalculableHolder<OriginRecord> ch : selected) {
                if (ch.toBoxKey().equals(keys.getOriginKey().toBoxKey())) {
                    prevOrigin = ch.getValue();
                    break;
                }
            }
        }

        if (isNull(prevOrigin)) {
            return;
        }

        // 1st level only
        DataRecord upsert = ctx.getRecord();
        for (Attribute attr : prevOrigin.getAttributeValues()) {
            if (upsert.getAttribute(attr.getName()) != null) {
                continue;
            }
            upsert.addAttribute(attr);
        }
    }

    private RecordKeys generateWithEtalonKey(UpsertRequestContext ctx, RecordUpsertChangeSet changeSet) {

        Date ts = ctx.timestamp();
        String user = SecurityUtils.getCurrentUserName();

        RecordEtalonPO etalon = RecordFactoryUtils.createRecordEtalonPO(ctx, null, RecordStatus.ACTIVE);
        changeSet.setEtalonRecordInsertPO(etalon);

        return RecordKeys.builder()
                .etalonKey(RecordEtalonKey.builder()
                        .id(etalon.getId())
                        .status(etalon.getStatus())
                        .build())
                .entityName(etalon.getName())
                .shard(etalon.getShard())
                .node(StorageUtils.node(etalon.getShard()))
                .createDate(ts)
                .createdBy(user)
                .updateDate(ts)
                .updatedBy(user)
                .build();
    }

    private RecordKeys updateWithOriginKey(UpsertRequestContext ctx, RecordUpsertChangeSet changeSet, RecordKeys keys, boolean hasEtalonRecord) {

        Date ts = ctx.timestamp();
        String user = SecurityUtils.getCurrentUserName();

        RecordOriginPO record = RecordFactoryUtils.createRecordOriginPO(ctx, keys, RecordStatus.ACTIVE);
        RecordExternalKeysPO recordEk = new RecordExternalKeysPO();
        recordEk.setExternalId(record.getExternalId(), record.getName(), record.getSourceSystem());
        recordEk.setEtalonId(UUID.fromString(record.getEtalonId()));

        RecordOriginPO system = null;
        RecordExternalKeysPO systemEk = null;

        // Check for first upsert and create
        // UD origin, if the upsert is not a UD upsert.
        SourceSystemElement sse =  metaModelService.instance(Descriptors.SOURCE_SYSTEMS).getAdminElement();
        if (!hasEtalonRecord && !sse.getName().equals(record.getSourceSystem())) {

            UpsertRequestContext sysCtx = UpsertRequestContext.builder()
                    .sourceSystem(sse.getName())
                    .entityName(record.getName())
                    .externalId(IdUtils.v1String())
                    .build();

            RecordKeys sysKeys = RecordKeys.builder()
                    .etalonKey(keys.getEtalonKey())
                    .shard(keys.getShard())
                    .build();

            sysCtx.timestamp(ts);

            system = RecordFactoryUtils.createRecordOriginPO(sysCtx, sysKeys, RecordStatus.ACTIVE);

            systemEk = new RecordExternalKeysPO();
            systemEk.setExternalId(system.getExternalId(), system.getName(), system.getSourceSystem());
            systemEk.setEtalonId(UUID.fromString(system.getEtalonId()));
        }

        RecordOriginKey uKey = RecordOriginKey.builder()
                .entityName(record.getName())
                .externalId(record.getExternalId())
                .id(record.getId())
                .initialOwner(record.getInitialOwner())
                .sourceSystem(record.getSourceSystem())
                .status(record.getStatus())
                .enrichment(false)
                .build();

        RecordOriginKey sKey = system == null
            ? null
            : RecordOriginKey.builder()
                .entityName(system.getName())
                .externalId(system.getExternalId())
                .id(system.getId())
                .initialOwner(record.getInitialOwner())
                .sourceSystem(system.getSourceSystem())
                .status(system.getStatus())
                .enrichment(false)
                .build();

        changeSet.getOriginRecordInsertPOs().add(record);
        changeSet.getExternalKeysInsertPOs().add(recordEk);
        if (Objects.nonNull(system)) {
            changeSet.getOriginRecordInsertPOs().add(system);
            changeSet.getExternalKeysInsertPOs().add(systemEk);
        }

        return RecordKeys.builder(keys)
                .originKey(uKey)
                .supplementaryKeys(sKey == null ? Collections.singletonList(uKey) : Arrays.asList(uKey, sKey))
                .updateDate(ts)
                .updatedBy(user)
                .build();
    }
}
